-
Notifications
You must be signed in to change notification settings - Fork 3.6k
[improve] [test] Add a test to verify no orphan consumers if release lock caused by checkConnectionLiveness #21498
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
[improve] [test] Add a test to verify no orphan consumers if release lock caused by checkConnectionLiveness #21498
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great initiative to add a test for the described scenario. I provided some review feedback mainly about the new test scaffolding.
import org.apache.pulsar.common.api.proto.CommandWatchTopicUpdate; | ||
import org.apache.pulsar.common.util.netty.EventLoopUtil; | ||
|
||
public class InjectedClientCnxClientBuilder { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great idea to have a separate class for this.
ClientConfigurationData conf = clientBuilder.getClientConfigurationData(); | ||
ThreadFactory threadFactory = new ExecutorProvider | ||
.ExtendedThreadFactory("pulsar-client-io", Thread.currentThread().isDaemon()); | ||
EventLoopGroup eventLoopGroup = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This would currently leak threads since it doesn't get closed. One possibility would be to override the closeAsync and shutdown methods of PulsarClientImpl like it was done in #21468 for PulsarTestClient.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed, I created a ulsarTestClient
directly.
new ClientCnxCustomizer(Type.PING){ | ||
@Override | ||
public void handleCommand(Object command) { | ||
if (command instanceof CommandPing) { | ||
// do not response anything. | ||
} | ||
} | ||
}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this ClientCnxCustomizer any easier than providing an actual anonymous ClientCnx class?
Passing a BiFunction<ClientConfigurationData, EventLoopGroup, ClientCnx>
as a parameter in the InjectedClientCnxClientBuilder
should be sufficient to support anything that is needed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think your suggestion is better.
I created a new implementation here, after the PR merged, I will back to finish this one.
Thanks
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
return new PulsarClientImpl(conf, eventLoopGroup, pool); | ||
} | ||
|
||
public static abstract class ClientCnxCustomizer { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see a reason to have ClientCnxCustomizer and InjectedClientCnx. Simply providing a BiFunction<ClientConfigurationData, EventLoopGroup, ClientCnx>
parameter for creating the ClientCnx instance should be sufficient.
For example, you'd use it somewhat like this:
.clientCnxFunction((conf, eventLoopGroup) -> {
return new ClientCnx(conf, eventLoopGroup) {
@Override
protected void handlePing(CommandPing ping) {
// do not respond to CommandPing
}
};
});
The PulsarTestClient shows how it could be implemented. PulsarTestClient is specialized for a certain test, but it contains some useful ideas how clientCnxFunction could be used to instantiate the ClientCnx when it's needed.
// Wait for all commands of the consumer c1 have been handled. To avoid the Broker mark the connection is active | ||
// after it receive anything. | ||
Thread.sleep(1000); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how do we know that 1000ms is sufficient?
b9a97a7
to
568431c
Compare
Motivation & Modifications
Add a new test used to verify that the Broker will not leave an orphan consumer in the scenario below:
subscription.
This task will be executed in another thread, which means it will release the lock
Synchronized(dispatcher)
by the Dispatcher.
(Highlight) Race condition: if the connection of "consumer-2" went to a wrong state before step 4,
"consumer-2" is maintained by the Subscription and not maintained by the Dispatcher. Would the scenario below
will happen?
nothing.
consumer under the Dispatcher.
Why there is no orphan consumer?
There are two mechanisms to avoid orphan consumers:
consumerFuture.complet(v)
returnedfalse
, broker will remove the consumer again. see https://github.com/apache/pulsar/blob/master/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java#L1271-L1285PersistentTopic.java
ServerCnx.java
Documentation
doc
doc-required
doc-not-needed
doc-complete
Matching PR in forked repository
PR in forked repository: x